# Producer 源码分析
作者:Ethan.Yang
博客:https://blog.ethanyang.cn (opens new window)
# 1 消息发送整体流程
下面是一个生产者发送消息的demo(同步发送)
public class SyncProducer {
public static void main(String[] args) throws Exception{
// 实例化消息生产者Producer
DefaultMQProducer producer = new DefaultMQProducer("group_test");
// 设置NameServer的地址
producer.setNamesrvAddr("127.0.0.1:9876");
//producer.setSendLatencyFaultEnable(true);
// 启动Producer实例
producer.start();
for (int i = 0; i < 100; i++) {
// 创建消息,并指定Topic,Tag和消息体
Message msg = new Message("TopicTest" /* Topic */,
"TagA" /* Tag */,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
);
// 发送消息到一个Broker
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
}
//如果不再发送消息,关闭Producer实例。
producer.shutdown();
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
主要做了几件事:
- 初始化一个生产者(DefaultMQProducer)对象
- 设置 NameServer 的地址
- 启动生产者
- 发送消息
# 2 消息发送者启动流程
RocketMQ 的 Producer 启动流程由 producer.start() 触发,最终通过 DefaultMQProducerImpl 完成客户端初始化、网络组件启动、路由定时刷新等行为。下图展示核心流程概览:

# 2.1 Producer.start() 入口
DefaultMQProducer.start() 本质调用的是内部实现类:
@Override
public void start() throws MQClientException {
this.setProducerGroup(withNamespace(this.producerGroup));
//todo Producer的启动流程核心是defaultMQProducerImpl.start();
this.defaultMQProducerImpl.start();
if (null != traceDispatcher) {
try {
traceDispatcher.start(this.getNamesrvAddr(), this.getAccessChannel());
} catch (MQClientException e) {
log.warn("acl dispatcher start failed ", e);
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
# 2.2 生产者核心启动流程
DefaultMQProducerImpl.start(), 生产者启动, 截取关键流程代码
public void start(final boolean startFactory) throws MQClientException {
switch (this.serviceState) {
case CREATE_JUST:
this.serviceState = ServiceState.START_FAILED;
// 1. 基础配置校验(ProducerGroup 是否合规)
this.checkConfig();
// 2. instanceName 置为进程 PID(避免冲突)
if (!this.defaultMQProducer.getProducerGroup()
.equals(MixAll.CLIENT_INNER_PRODUCER_GROUP)) {
this.defaultMQProducer.changeInstanceNameToPID();
}
// 3. 获取/创建 MQClientInstance(JVM 全局唯一)
this.mQClientFactory =
MQClientManager.getInstance()
.getOrCreateMQClientInstance(this.defaultMQProducer, rpcHook);
// 4. 注册 Producer 到 MQClientInstance(Producer → MQClientInstance 映射)
boolean registerOK =
mQClientFactory.registerProducer(
this.defaultMQProducer.getProducerGroup(), this);
if (!registerOK) {
throw new MQClientException("producer group duplicate", null);
}
// 5. 创建默认 Topic 路由缓存
this.topicPublishInfoTable.put(
this.defaultMQProducer.getCreateTopicKey(),
new TopicPublishInfo());
// 6. 启动 MQClientInstance(真正启动网络通信、定时任务、拉取线程)
if (startFactory) {
mQClientFactory.start();
}
this.serviceState = ServiceState.RUNNING;
break;
default:
throw new MQClientException("Producer already started or illegal state", null);
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
核心总结: Producer 本身只是“轻量客户端”,真正的网络通信、路由管理、心跳发送等全部委托给 MQClientInstance(JVM 内唯一)。
# 2.3 关键步骤解析
# 1. 配置校验:checkConfig()
private void checkConfig() throws MQClientException {
Validators.checkGroup(this.defaultMQProducer.getProducerGroup());
//非空判断
if (null == this.defaultMQProducer.getProducerGroup()) {
throw new MQClientException("producerGroup is null", null);
}
// 禁止使用系统默认组名
if (this.defaultMQProducer.getProducerGroup().equals(MixAll.DEFAULT_PRODUCER_GROUP)) {
throw new MQClientException("producerGroup can not equal " + MixAll.DEFAULT_PRODUCER_GROUP + ", please specify another one.",
null);
}
}
2
3
4
5
6
7
8
9
10
11
12
# 2. 获取 MQClientInstance
一个 JVM 中所有 Producer / Consumer 共享一个 MQClientInstance(同 clientId 才会复用)。
ConcurrentMap<String, MQClientInstance> factoryTable;
获取流程:
public MQClientInstance getOrCreateMQClientInstance(ClientConfig clientConfig, RPCHook rpcHook) {
String clientId = clientConfig.buildMQClientId(); // IP@instanceName@unitName
MQClientInstance instance = factoryTable.get(clientId);
if (instance == null) {
instance = new MQClientInstance(...);
MQClientInstance prev = factoryTable.putIfAbsent(clientId, instance);
if (prev != null) instance = prev;
}
return instance;
}
2
3
4
5
6
7
8
9
10
11
clientId 规则:
clientIP @ instanceName @ unitName
➡ 若三个字段一致,则 Producer / Consumer 将共享同一个 MQClientInstance,要特别注意 groupName 冲突问题!
RocketMQ中消息发送者、消息消费者都属于”客户端“
每一个客户端就是一个MQClientInstance,每一个ClientConfig对应一个实例。
故不同的生产者、消费端,如果引用同一个客户端配置(ClientConfig),则它们共享一个MQClientInstance实例。所以我们在定义的的时候要注意这种问题(生产者和消费者如果分组名相同容易导致这个问题)

# 2.4 客户端实际启动流程
Producer 的网络组件、定时任务、消费者线程等,全部由 MQClientInstance 启动:
public void start() throws MQClientException {
synchronized (this) {
switch (this.serviceState) {
case CREATE_JUST:
this.serviceState = ServiceState.START_FAILED;
// 1. 获取 NameServer 地址
if (clientConfig.getNamesrvAddr() == null) {
mQClientAPIImpl.fetchNameServerAddr();
}
// 2. 启动 RPC 通道(底层 Netty)
mQClientAPIImpl.start();
// 3. 启动各种定时任务(更新路由、发送心跳等)
startScheduledTask();
// 4. 消费者专用线程:拉消息
pullMessageService.start();
// 5. 消费者专用线程:负载均衡
rebalanceService.start();
// 6. 最终反向调用 Producer.start(false)
defaultMQProducer.getDefaultMQProducerImpl().start(false);
this.serviceState = ServiceState.RUNNING;
break;
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
# 2.5 定时任务
Producer 与 Broker 的关系 不是实时同步,而是依赖定时任务维持, 具体可查看源码startScheduledTask()方法。
关键任务如下:
每 2 分钟:更新 NameServer 地址
fetchNameServerAddr()1每 30 秒:更新 Topic 路由信息
updateTopicRouteInfoFromNameServer()1每 30 秒:向 Broker 发送心跳
维护 Producer/Consumer 与 Broker 的连接。
# 3 消息发送队列选择
Producer 发送消息前,必须从 Topic 的多个 MessageQueue 中选择一个最终队列。RocketMQ 提供两套策略:
- 默认轮询策略(Round-Robin)
- Broker 故障延迟规避策略(Latency Fault Tolerance)
选择策略的流程如下:

两者的核心入口都来自:
DefaultMQProducerImpl.sendDefaultImpl()
→ selectOneMessageQueue()
2
# 3.1 默认选择队列策略
默认情况下,RocketMQ 使用 最简单可靠的轮询算法:
int index = tpInfo.getSendWhichQueue().getAndIncrement();
MessageQueue mq = tpInfo.getMessageQueueList().get(index % queueSize);
2
特点:
- 均匀分布:消息尽可能平均落入每个队列
- 无状态简单:不依赖历史,只根据当前线程的 index
- 适用于网络稳定、Broker基本不可用的情况
注意: 如果发送失败进入重试(默认重试 2 次 → 共 3 次发送尝试),轮询也会跳过失败队列。
# 3.2 故障延迟机制策略
在网络不稳定、Broker 坏节点较容易出现的场景中,可开启:
sendLatencyFaultEnable = true
核心思想:
根据发送耗时/失败情况,为每个 Broker 计算一个“不可用时长”,下次选择队列时跳过这些 Broker。
流程:
- 每次发送消息后记录延迟
- 调用
updateFaultItem()更新 Broker 的延迟与不可用时间窗口 - 下次选队列时,优先跳过不可用的 Broker
- 若所有 Broker 都处于不可用窗口,则选择一个“最不差的 Broker”兜底
# 3.2.1 Producer 发送流程中的队列选择关键部分
核心代码片段(简化):
MessageQueue mqSelected =
this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);
2
在 selectOneMessageQueue() 中:
若打开延迟规避:
if (sendLatencyFaultEnable) {
int index = tpInfo.getSendWhichQueue().getAndIncrement();
for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {
int pos = index % tpInfo.getMessageQueueList().size();
MessageQueue mq = tpInfo.getMessageQueueList().get(pos);
// 只选择可用 Broker
if (latencyFaultTolerance.isAvailable(mq.getBrokerName()))
return mq;
index++;
}
// 所有 Broker 都不可用,兜底选一个“最不差的”
String notBestBroker = latencyFaultTolerance.pickOneAtLeast();
int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);
return new MessageQueue(topic, notBestBroker, randomIndex % writeQueueNums);
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
# 3.2.2 延迟容错计算逻辑
每次发送成功/失败后会调用:
updateFaultItem(brokerName, latency, isolation)
其中 latency 为发送耗时,失败时按 30s 兜底。
计算“不可用时间窗口”:
| 发送耗时 ≥ | 不可用时长 |
|---|---|
| 50ms | 0 |
| 100ms | 0 |
| 550ms | 30s |
| 1000ms | 60s |
| 2000ms | 120s |
| 3000ms | 180s |
| 15000ms | 600s |
源码:
private long computeNotAvailableDuration(long currentLatency) {
for (int i = latencyMax.length - 1; i >= 0; i--) {
if (currentLatency >= latencyMax[i])
return notAvailableDuration[i];
}
return 0;
}
2
3
4
5
6
7
整体效果:
- 发送延迟大表示 Broker 负载高 / 网络有问题 → 临时回避
- 避免连续向“坏节点”发送
- 提高整体发送成功率
# 3.3 两种策略的使用建议
| 场景 | 推荐策略 |
|---|---|
| 内网环境、网络稳定、Broker 基本正常 | 默认轮询(简单高效) |
| 跨机房、网络抖动、Broker 时常不可用 | 延迟容错策略(高可用) |
延迟容错策略优点:
- 自动跳过不可用 Broker
- 显著提升发送成功率
- 避免重试时连续命中坏节点
注意前提: Topic 必须创建在 多个 Broker 上,否则规避没有意义。
# 3.4 技术亮点:ThreadLocal
public class ThreadLocalIndex {
private final ThreadLocal<Integer> threadLocalIndex = new ThreadLocal<Integer>();
private final Random random = new Random();
2
3
public class TopicPublishInfo {
private boolean orderTopic = false;
private boolean haveTopicRouterInfo = false;
private List<MessageQueue> messageQueueList = new ArrayList<MessageQueue>();
//基于ThreadLocal,多线程安全
private volatile ThreadLocalIndex sendWhichQueue = new ThreadLocalIndex();
2
3
4
5
6
public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {
//todo 默认不走这里:Broker故障延迟机制
if (this.sendLatencyFaultEnable) {
try {
int index = tpInfo.getSendWhichQueue().getAndIncrement();
//对消息队列轮询获取一个队列
for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {
//基于index和队列数量取余,确定位置
int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();
2
3
4
5
6
7
8
9
# 4 客户端建立连接的时机
RocketMQ 的 Producer / Consumer 与 Broker 并不是在 start() 时立即建立连接,而是遵循 按需建立(lazy connect)+ 长连接复用 的设计。
下面通过源码和时序说明详细拆解。
# 4.1 Producer 什么时候与 Broker 建立连接?
调用顺序如下:
producer.start();
SendResult result = producer.send(msg);
2
问题是:
✔ producer.start() 时会连接 Broker 吗?
✘ 不会直接创建与 Broker 的连接
✔ 发送时会连接 Broker 吗? ✔ 是的:第一次发送消息或第一次访问 Broker 时才会建连
RocketMQ 采用 按需创建连接(lazy creating) 的策略。
# 4.2 源码证明
Producer 的发送链路:
send()
→ sendDefaultImpl()
→ sendKernelImpl()
→ doRequest
→ invokeSync()
→ getAndCreateChannel()
2
3
4
5
6
关键代码:
public RemotingCommand invokeSync(String addr, RemotingCommand request, long timeout)
throws InterruptedException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException {
// ★ 关键点:在这里创建与 Broker 的物理连接(TCP + Netty Channel)
final Channel channel = this.getAndCreateChannel(addr);
return this.invokeSyncImpl(channel, request, timeout);
}
2
3
4
5
6
7
关键结论:
连接的建立发生在第一次需要与 Broker 通信时(例如发送消息)。
# 4.3 按需建连的核心逻辑
MQClientAPIImpl.getAndCreateChannel() 的逻辑如下:
- 查看内部缓存(一个 Broker 地址对应一个 Channel)
- 若已存在且仍可用 → 直接复用
- 若不存在 → 创建新的 Netty Channel
- 建立 TCP 长连接后放入缓存
伪代码结构:
ChannelWrapper cw = channelTables.get(addr);
if (cw != null && cw.isOK()) {
return cw.getChannel();
}
// ★ 这里真正建立连接(Netty Bootstrap.connect)
Channel ch = this.createNewChannel(addr);
// 放入缓存
channelTables.put(addr, new ChannelWrapper(ch));
return ch;
2
3
4
5
6
7
8
9
10
11
12
这段逻辑直接说明:
客户端在真正需要访问某个 Broker 的时候才创建连接,比如第一次发送消息。
主要由于以下设计目的:
减少无效连接(按需建立更合理)
Producer 未必一启动就会发送消息,也未必发送到所有 Broker。
提高大规模集群网络利用率
假如一个 Topic 有 20 个队列、分布在多个 Broker 上→ Producer 没必要一次性与所有 Broker 建连。
MQClientInstance 已经在 start() 中启动心跳、路由刷新等任务但并不会发起实际业务通信,因此也不会主动触发建连。